In [ ]:
import pandas as pd
import elasticsearch
import json
import re
es = elasticsearch.Elasticsearch()
In this example, we use the Google geocoding API to translate addresses into geo-coordinates. Google imposes usages limits on the API. If you are using this script to index data, you many need to sign up for an API key to overcome limits.
In [ ]:
from geopy.geocoders import GoogleV3
geolocator = GoogleV3()
# geolocator = GoogleV3(api_key=<your_google_api_key>)
In [ ]:
t = pd.read_csv('https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD', header=0, sep=',', dtype={'PHONE':str, 'INSPECTION DATE':str});
In [ ]:
## Helper Functions
from datetime import datetime
def str_to_iso(text):
if text != '':
for fmt in (['%m/%d/%Y']):
try:
#print(fmt)
#print(datetime.strptime(text, fmt))
return datetime.isoformat(datetime.strptime(text, fmt))
except ValueError:
#print(text)
pass
#raise ValueError('Changing date')
else:
return None
def getLatLon(row):
if row['Address'] != '':
location = geolocator.geocode(row['Address'], timeout=10000, sensor=False)
if location != None:
lat = location.latitude
lon = location.longitude
#print(lat,lon)
return [lon, lat]
elif row['Zipcode'] !='' or location != None:
location = geolocator.geocode(row['Zipcode'], timeout=10000, sensor=False)
if location != None:
lat = location.latitude
lon = location.longitude
#print(lat,lon)
return [lon, lat]
else:
return None
def getAddress(row):
if row['Building'] != '' and row['Street'] != '' and row['Boro'] != '':
x = row['Building']+' '+row['Street']+' '+row['Boro']+',NY'
x = re.sub(' +',' ',x)
return x
else:
return ''
def combineCT(x):
return str(x['Inspection_Date'][0][0:10])+'_'+str(x['Camis'])
In [ ]:
# process column names: remove spaces & use title casing
t.columns = map(str.title, t.columns)
t.columns = map(lambda x: x.replace(' ', '_'), t.columns)
# replace nan with ''
t.fillna('', inplace=True)
# Convert date to ISO format
t['Inspection_Date'] = t['Inspection_Date'].map(lambda x: str_to_iso(x))
t['Record_Date'] = t['Record_Date'].map(lambda x: str_to_iso(x))
t['Grade_Date'] = t['Grade_Date'].map(lambda x: str_to_iso(x))
#t['Inspection_Date'] = t['Inspection_Date'].map(lambda x: x.split('/'))
# Combine Street, Building and Boro information to create Address string
t['Address'] = t.apply(getAddress, axis=1)
Create a dictionary of unique Addresses. We do this to avoid calling the Google geocoding api multiple times for the same address
In [ ]:
addDict = t[['Address','Zipcode']].copy(deep=True)
addDict = addDict.drop_duplicates()
addDict['Coord'] = [None]* len(addDict)
Get address for the geolocation for each address. This step can take a while because it's calling the Google geocoding API for each unique address.
In [ ]:
for item_id, item in addDict.iterrows():
if item_id % 100 == 0:
print(item_id)
if addDict['Coord'][item_id] == None:
addDict['Coord'][item_id] = getLatLon(item)
#print(addDict.loc[item_id]['Coord'])
# Save address dictionary to CSV
#addDict.to_csv('./dict_final.csv')
In [ ]:
# Merge coordinates into original table
t1 = t.merge(addDict[['Address', 'Coord']])
# Keep only 1 value of score and grade per inspection
t2=t1.copy(deep=True)
t2['raw_num'] = t2.index
t2['RI'] = t2.apply(combineCT, axis=1)
yy = t2.groupby('RI').first().reset_index()['raw_num']
t2['Unique_Score'] = None
t2['Unique_Score'].loc[yy.values] = t2['Score'].loc[yy.values]
t2['Unique_Grade'] = None
t2['Unique_Grade'].loc[yy.values] = t2['Grade'].loc[yy.values]
del(t2['RI'])
del(t2['raw_num'])
del(t2['Grade'])
del(t2['Score'])
t2.rename(columns={'Unique_Grade' : 'Grade','Unique_Score':'Score'}, inplace=True)
t2['Grade'].fillna('', inplace=True)
In [ ]:
t2.iloc[1]
In [ ]:
### Create and configure Elasticsearch index
# Name of index and document type
index_name = 'nyc_restaurants';
doc_name = 'inspection'
# Delete donorschoose index if one does exist
if es.indices.exists(index_name):
es.indices.delete(index_name)
# Create donorschoose index
es.indices.create(index_name)
In [ ]:
# Add mapping
with open('./inspection_mapping.json') as json_mapping:
d = json.load(json_mapping)
es.indices.put_mapping(index=index_name, doc_type=doc_name, body=d)
# Index data
for item_id, item in t2.iterrows():
if item_id % 1000 == 0:
print(item_id)
thisItem = item.to_dict()
#thisItem['Coord'] = getLatLon(thisItem)
thisDoc = json.dumps(thisItem);
#pprint.pprint(thisItem)
# write to elasticsearch
es.index(index=index_name, doc_type=doc_name, id=item_id, body=thisDoc)
In [ ]: